tg-me.com/pyproglib/6674
Last Update:
Инструмент недели: планирование задач с Rocketry
Rocketry — это фреймворк для планирования задач в Python, который позволяет легко управлять задачами и их выполнением с помощью простых выражений.
Для начала установите Rocketry с помощью pip:
pip install rocketry
Создайте файл, например,
scheduler.py
, и добавьте в него следующий код:from rocketry import Rocketry
from rocketry.conds import daily
app = Rocketry()
@app.task(daily)
def do_daily():
print("Эта задача выполняется каждый день!")
if __name__ == '__main__':
app.run()
Этот код создаёт задачу, которая выполняется каждый день. Планировщик будет работать в фоновом режиме, выполняя задачу каждый день.
После написания кода запустите его с помощью Python:
python scheduler.py
Теперь ваша задача будет выполняться каждый день.
Rocketry позволяет настраивать задачи с использованием различных условий.
@app.task(every("10 seconds"))
def do_continuously():
print("Эта задача выполняется каждую секунду!")
@app.task(daily.after("07:00"))
def do_daily_after_seven():
print("Эта задача выполняется каждый день после 7:00!")
@app.task(hourly & time_of_day.between("22:00", "06:00"))
def do_hourly_at_night():
print("Эта задача выполняется каждый час ночью!")
@app.task(cron("* 2 * * *"))
def do_based_on_cron():
print("Эта задача выполняется по cron-выражению!")
Rocketry поддерживает пайплайнинг задач — передача данных между задачами:
from rocketry.conds import daily, after_success
from rocketry.args import Return
@app.task(daily.after("07:00"))
def do_first():
return 'Hello World'
@app.task(after_success(do_first))
def do_second(arg=Return('do_first')):
print(f"Задача 'do_first' вернула: {arg}")
return 'Hello Python'
Rocketry поддерживает выполнение задач в разных режимах: синхронно, асинхронно, в отдельных потоках и процессах:
@app.task(daily, execution="main")
def do_unparallel():
print("Эта задача выполняется синхронно.")
@app.task(daily, execution="async")
async def do_async():
print("Эта задача выполняется асинхронно.")
@app.task(daily, execution="thread")
def do_on_separate_thread():
print("Эта задача выполняется в отдельном потоке.")
@app.task(daily, execution="process")
def do_on_separate_process():
print("Эта задача выполняется в отдельном процессе.")
Библиотека питониста #буст